#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/RPCSub.h>
#include <xrpld/rpc/Role.h>
#include <xrpld/rpc/detail/RPCHelpers.h>

#include <xrpl/basics/Log.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/resource/Fees.h>

namespace ripple {

Json::Value
doSubscribe(RPC::JsonContext& context)
{
    InfoSub::pointer ispSub;
    Json::Value jvResult(Json::objectValue);

    if (!context.infoSub && !context.params.isMember(jss::url))
    {
        // Must be a JSON-RPC call.
        JLOG(context.j.info()) << "doSubscribe: RPC subscribe requires a url";
        return rpcError(rpcINVALID_PARAMS);
    }

    if (context.params.isMember(jss::url))
    {
        if (context.role != Role::ADMIN)
            return rpcError(rpcNO_PERMISSION);

        std::string strUrl = context.params[jss::url].asString();
        std::string strUsername = context.params.isMember(jss::url_username)
            ? context.params[jss::url_username].asString()
            : "";
        std::string strPassword = context.params.isMember(jss::url_password)
            ? context.params[jss::url_password].asString()
            : "";

        // DEPRECATED
        if (context.params.isMember(jss::username))
            strUsername = context.params[jss::username].asString();

        // DEPRECATED
        if (context.params.isMember(jss::password))
            strPassword = context.params[jss::password].asString();

        ispSub = context.netOps.findRpcSub(strUrl);
        if (!ispSub)
        {
            JLOG(context.j.debug()) << "doSubscribe: building: " << strUrl;
            try
            {
                auto rspSub = make_RPCSub(
                    context.app.getOPs(),
                    context.app.getIOContext(),
                    context.app.getJobQueue(),
                    strUrl,
                    strUsername,
                    strPassword,
                    context.app.logs());
                ispSub = context.netOps.addRpcSub(
                    strUrl, std::dynamic_pointer_cast<InfoSub>(rspSub));
            }
            catch (std::runtime_error& ex)
            {
                return RPC::make_param_error(ex.what());
            }
        }
        else
        {
            JLOG(context.j.trace()) << "doSubscribe: reusing: " << strUrl;

            if (auto rpcSub = std::dynamic_pointer_cast<RPCSub>(ispSub))
            {
                // Why do we need to check isMember against jss::username and
                // jss::password here instead of just setting the username and
                // the password? What about url_username and url_password?
                if (context.params.isMember(jss::username))
                    rpcSub->setUsername(strUsername);

                if (context.params.isMember(jss::password))
                    rpcSub->setPassword(strPassword);
            }
        }
    }
    else
    {
        ispSub = context.infoSub;
    }
    ispSub->setApiVersion(context.apiVersion);

    if (context.params.isMember(jss::streams))
    {
        if (!context.params[jss::streams].isArray())
        {
            JLOG(context.j.info()) << "doSubscribe: streams requires an array.";
            return rpcError(rpcINVALID_PARAMS);
        }

        for (auto const& it : context.params[jss::streams])
        {
            if (!it.isString())
                return rpcError(rpcSTREAM_MALFORMED);

            std::string streamName = it.asString();
            if (streamName == "server")
            {
                context.netOps.subServer(
                    ispSub, jvResult, context.role == Role::ADMIN);
            }
            else if (streamName == "ledger")
            {
                context.netOps.subLedger(ispSub, jvResult);
            }
            else if (streamName == "book_changes")
            {
                context.netOps.subBookChanges(ispSub);
            }
            else if (streamName == "manifests")
            {
                context.netOps.subManifests(ispSub);
            }
            else if (streamName == "transactions")
            {
                context.netOps.subTransactions(ispSub);
            }
            else if (
                streamName == "transactions_proposed" ||
                streamName == "rt_transactions")  // DEPRECATED
            {
                context.netOps.subRTTransactions(ispSub);
            }
            else if (streamName == "validations")
            {
                context.netOps.subValidations(ispSub);
            }
            else if (streamName == "peer_status")
            {
                if (context.role != Role::ADMIN)
                    return rpcError(rpcNO_PERMISSION);
                context.netOps.subPeerStatus(ispSub);
            }
            else if (streamName == "consensus")
            {
                context.netOps.subConsensus(ispSub);
            }
            else
            {
                return rpcError(rpcSTREAM_MALFORMED);
            }
        }
    }

    auto accountsProposed = context.params.isMember(jss::accounts_proposed)
        ? jss::accounts_proposed
        : jss::rt_accounts;  // DEPRECATED
    if (context.params.isMember(accountsProposed))
    {
        if (!context.params[accountsProposed].isArray())
            return rpcError(rpcINVALID_PARAMS);

        auto ids = RPC::parseAccountIds(context.params[accountsProposed]);
        if (ids.empty())
            return rpcError(rpcACT_MALFORMED);
        context.netOps.subAccount(ispSub, ids, true);
    }

    if (context.params.isMember(jss::accounts))
    {
        if (!context.params[jss::accounts].isArray())
            return rpcError(rpcINVALID_PARAMS);

        auto ids = RPC::parseAccountIds(context.params[jss::accounts]);
        if (ids.empty())
            return rpcError(rpcACT_MALFORMED);
        context.netOps.subAccount(ispSub, ids, false);
        JLOG(context.j.debug()) << "doSubscribe: accounts: " << ids.size();
    }

    if (context.params.isMember(jss::account_history_tx_stream))
    {
        if (!context.app.config().useTxTables())
            return rpcError(rpcNOT_ENABLED);

        context.loadType = Resource::feeMediumBurdenRPC;
        auto const& req = context.params[jss::account_history_tx_stream];
        if (!req.isMember(jss::account) || !req[jss::account].isString())
            return rpcError(rpcINVALID_PARAMS);

        auto const id = parseBase58<AccountID>(req[jss::account].asString());
        if (!id)
            return rpcError(rpcINVALID_PARAMS);

        if (auto result = context.netOps.subAccountHistory(ispSub, *id);
            result != rpcSUCCESS)
        {
            return rpcError(result);
        }

        jvResult[jss::warning] =
            "account_history_tx_stream is an experimental feature and likely "
            "to be removed in the future";
        JLOG(context.j.debug())
            << "doSubscribe: account_history_tx_stream: " << toBase58(*id);
    }

    if (context.params.isMember(jss::books))
    {
        if (!context.params[jss::books].isArray())
            return rpcError(rpcINVALID_PARAMS);

        for (auto& j : context.params[jss::books])
        {
            if (!j.isObject() || !j.isMember(jss::taker_pays) ||
                !j.isMember(jss::taker_gets) ||
                !j[jss::taker_pays].isObjectOrNull() ||
                !j[jss::taker_gets].isObjectOrNull())
                return rpcError(rpcINVALID_PARAMS);

            Book book;
            Json::Value taker_pays = j[jss::taker_pays];
            Json::Value taker_gets = j[jss::taker_gets];

            // Parse mandatory currency.
            if (!taker_pays.isMember(jss::currency) ||
                !to_currency(
                    book.in.currency, taker_pays[jss::currency].asString()))
            {
                JLOG(context.j.info()) << "Bad taker_pays currency.";
                return rpcError(rpcSRC_CUR_MALFORMED);
            }

            // Parse optional issuer.
            if (((taker_pays.isMember(jss::issuer)) &&
                 (!taker_pays[jss::issuer].isString() ||
                  !to_issuer(
                      book.in.account, taker_pays[jss::issuer].asString())))
                // Don't allow illegal issuers.
                || (!book.in.currency != !book.in.account) ||
                noAccount() == book.in.account)
            {
                JLOG(context.j.info()) << "Bad taker_pays issuer.";
                return rpcError(rpcSRC_ISR_MALFORMED);
            }

            // Parse mandatory currency.
            if (!taker_gets.isMember(jss::currency) ||
                !to_currency(
                    book.out.currency, taker_gets[jss::currency].asString()))
            {
                JLOG(context.j.info()) << "Bad taker_gets currency.";
                return rpcError(rpcDST_AMT_MALFORMED);
            }

            // Parse optional issuer.
            if (((taker_gets.isMember(jss::issuer)) &&
                 (!taker_gets[jss::issuer].isString() ||
                  !to_issuer(
                      book.out.account, taker_gets[jss::issuer].asString())))
                // Don't allow illegal issuers.
                || (!book.out.currency != !book.out.account) ||
                noAccount() == book.out.account)
            {
                JLOG(context.j.info()) << "Bad taker_gets issuer.";
                return rpcError(rpcDST_ISR_MALFORMED);
            }

            if (book.in.currency == book.out.currency &&
                book.in.account == book.out.account)
            {
                JLOG(context.j.info()) << "taker_gets same as taker_pays.";
                return rpcError(rpcBAD_MARKET);
            }

            std::optional<AccountID> takerID;

            if (j.isMember(jss::taker))
            {
                takerID = parseBase58<AccountID>(j[jss::taker].asString());
                if (!takerID)
                    return rpcError(rpcBAD_ISSUER);
            }

            if (j.isMember(jss::domain))
            {
                uint256 domain;
                if (!j[jss::domain].isString() ||
                    !domain.parseHex(j[jss::domain].asString()))
                {
                    return rpcError(rpcDOMAIN_MALFORMED);
                }
                else
                {
                    book.domain = domain;
                }
            }

            if (!isConsistent(book))
            {
                JLOG(context.j.warn()) << "Bad market: " << book;
                return rpcError(rpcBAD_MARKET);
            }

            context.netOps.subBook(ispSub, book);

            // both_sides is deprecated.
            bool const both =
                (j.isMember(jss::both) && j[jss::both].asBool()) ||
                (j.isMember(jss::both_sides) && j[jss::both_sides].asBool());

            if (both)
                context.netOps.subBook(ispSub, reversed(book));

            // state_now is deprecated.
            if ((j.isMember(jss::snapshot) && j[jss::snapshot].asBool()) ||
                (j.isMember(jss::state_now) && j[jss::state_now].asBool()))
            {
                context.loadType = Resource::feeMediumBurdenRPC;
                std::shared_ptr<ReadView const> lpLedger =
                    context.app.getLedgerMaster().getPublishedLedger();
                if (lpLedger)
                {
                    Json::Value const jvMarker = Json::Value(Json::nullValue);
                    Json::Value jvOffers(Json::objectValue);

                    auto add = [&](Json::StaticString field) {
                        context.netOps.getBookPage(
                            lpLedger,
                            field == jss::asks ? reversed(book) : book,
                            takerID ? *takerID : noAccount(),
                            false,
                            RPC::Tuning::bookOffers.rdefault,
                            jvMarker,
                            jvOffers);

                        if (jvResult.isMember(field))
                        {
                            Json::Value& results(jvResult[field]);
                            for (auto const& e : jvOffers[jss::offers])
                                results.append(e);
                        }
                        else
                        {
                            jvResult[field] = jvOffers[jss::offers];
                        }
                    };

                    if (both)
                    {
                        add(jss::bids);
                        add(jss::asks);
                    }
                    else
                    {
                        add(jss::offers);
                    }
                }
            }
        }
    }

    return jvResult;
}

}  // namespace ripple
